diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execParallel.c | 33 | ||||
-rw-r--r-- | src/backend/executor/nodeGather.c | 54 |
2 files changed, 66 insertions, 21 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index efcbaef416c..99a9de3cdc3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node, ExecParallelEstimateContext *e); static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); -static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt); +static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, + bool reinitialize); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate, * to the main backend and start the workers. */ static shm_mq_handle ** -ExecParallelSetupTupleQueues(ParallelContext *pcxt) +ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) { shm_mq_handle **responseq; char *tqueuespace; @@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) responseq = (shm_mq_handle **) palloc(pcxt->nworkers * sizeof(shm_mq_handle *)); - /* Allocate space from the DSM for the queues themselves. */ - tqueuespace = shm_toc_allocate(pcxt->toc, - PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + /* + * If not reinitializing, allocate space from the DSM for the queues; + * otherwise, find the already allocated space. + */ + if (!reinitialize) + tqueuespace = + shm_toc_allocate(pcxt->toc, + PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + else + tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE); /* Create the queues, and become the receiver for each. */ for (i = 0; i < pcxt->nworkers; ++i) @@ -248,13 +256,24 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) } /* Add array of queues to shm_toc, so others can find it. */ - shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); + if (!reinitialize) + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); /* Return array of handles. */ return responseq; } /* + * Re-initialize the response queues for backend workers to return tuples + * to the main backend and start the workers. + */ +shm_mq_handle ** +ExecParallelReinitializeTupleQueues(ParallelContext *pcxt) +{ + return ExecParallelSetupTupleQueues(pcxt, true); +} + +/* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ @@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->buffer_usage = bufusage_space; /* Set up tuple queues. */ - pei->tqueue = ExecParallelSetupTupleQueues(pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); /* * If instrumentation options were supplied, allocate space for the diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 9c1533e3113..5f589614dc2 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -41,6 +41,7 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate); +static void ExecShutdownGatherWorkers(GatherState *node); /* ---------------------------------------------------------------- @@ -150,9 +151,10 @@ ExecGather(GatherState *node) bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ - node->pei = ExecInitParallelPlan(node->ps.lefttree, - estate, - gather->num_workers); + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gather->num_workers); /* * Register backend workers. We might not get as many as we @@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate) gatherstate->need_to_scan_locally, &done); if (done) - ExecShutdownGather(gatherstate); + ExecShutdownGatherWorkers(gatherstate); if (HeapTupleIsValid(tup)) { @@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate) } /* ---------------------------------------------------------------- - * ExecShutdownGather + * ExecShutdownGatherWorkers * - * Destroy the setup for parallel workers. Collect all the - * stats after workers are stopped, else some work done by - * workers won't be accounted. + * Destroy the parallel workers. Collect all the stats after + * workers are stopped, else some work done by workers won't be + * accounted. * ---------------------------------------------------------------- */ void -ExecShutdownGather(GatherState *node) +ExecShutdownGatherWorkers(GatherState *node) { /* Shut down tuple queue funnel before shutting down workers. */ if (node->funnel != NULL) @@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node) /* Now shut down the workers. */ if (node->pei != NULL) - { ExecParallelFinish(node->pei); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGather + * + * Destroy the setup for parallel workers including parallel context. + * Collect all the stats after workers are stopped, else some work + * done by workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGather(GatherState *node) +{ + ExecShutdownGatherWorkers(node); + + /* Now destroy the parallel context. */ + if (node->pei != NULL) + { ExecParallelCleanup(node->pei); node->pei = NULL; } @@ -349,14 +368,21 @@ void ExecReScanGather(GatherState *node) { /* - * Re-initialize the parallel context and workers to perform rescan of - * relation. We want to gracefully shutdown all the workers so that they + * Re-initialize the parallel workers to perform rescan of relation. + * We want to gracefully shutdown all the workers so that they * should be able to propagate any error or other information to master - * backend before dying. + * backend before dying. Parallel context will be reused for rescan. */ - ExecShutdownGather(node); + ExecShutdownGatherWorkers(node); node->initialized = false; + if (node->pei) + { + ReinitializeParallelDSM(node->pei->pcxt); + node->pei->tqueue = + ExecParallelReinitializeTupleQueues(node->pei->pcxt); + } + ExecReScan(node->ps.lefttree); } |