diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execParallel.c | 155 | ||||
-rw-r--r-- | src/backend/executor/nodeSort.c | 97 |
2 files changed, 186 insertions, 66 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ad9eba63dd3..01316ff5d94 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -28,9 +28,10 @@ #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" -#include "executor/nodeSeqscan.h" #include "executor/nodeIndexscan.h" #include "executor/nodeIndexonlyscan.h" +#include "executor/nodeSeqscan.h" +#include "executor/nodeSort.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" @@ -202,10 +203,10 @@ ExecSerializePlan(Plan *plan, EState *estate) } /* - * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes - * may need some state which is shared across all parallel workers. Before - * we size the DSM, give them a chance to call shm_toc_estimate_chunk or - * shm_toc_estimate_keys on &pcxt->estimator. + * Parallel-aware plan nodes (and occasionally others) may need some state + * which is shared across all parallel workers. Before we size the DSM, give + * them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on + * &pcxt->estimator. * * While we're at it, count the number of PlanState nodes in the tree, so * we know how many SharedPlanStateInstrumentation structures we need. @@ -219,38 +220,43 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* Count this node. */ e->nnodes++; - /* Call estimators for parallel-aware nodes. */ - if (planstate->plan->parallel_aware) + switch (nodeTag(planstate)) { - switch (nodeTag(planstate)) - { - case T_SeqScanState: + case T_SeqScanState: + if (planstate->plan->parallel_aware) ExecSeqScanEstimate((SeqScanState *) planstate, e->pcxt); - break; - case T_IndexScanState: + break; + case T_IndexScanState: + if (planstate->plan->parallel_aware) ExecIndexScanEstimate((IndexScanState *) planstate, e->pcxt); - break; - case T_IndexOnlyScanState: + break; + case T_IndexOnlyScanState: + if (planstate->plan->parallel_aware) ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate, e->pcxt); - break; - case T_ForeignScanState: + break; + case T_ForeignScanState: + if (planstate->plan->parallel_aware) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); - break; - case T_CustomScanState: + break; + case T_CustomScanState: + if (planstate->plan->parallel_aware) ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); - break; - case T_BitmapHeapScanState: + break; + case T_BitmapHeapScanState: + if (planstate->plan->parallel_aware) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, e->pcxt); - break; - default: - break; - } + break; + case T_SortState: + /* even when not parallel-aware */ + ExecSortEstimate((SortState *) planstate, e->pcxt); + default: + break; } return planstate_tree_walker(planstate, ExecParallelEstimate, e); @@ -276,46 +282,51 @@ ExecParallelInitializeDSM(PlanState *planstate, d->nnodes++; /* - * Call initializers for parallel-aware plan nodes. + * Call initializers for DSM-using plan nodes. * - * Ordinary plan nodes won't do anything here, but parallel-aware plan - * nodes may need to initialize shared state in the DSM before parallel - * workers are available. They can allocate the space they previously + * Most plan nodes won't do anything here, but plan nodes that allocated + * DSM may need to initialize shared state in the DSM before parallel + * workers are launched. They can allocate the space they previously * estimated using shm_toc_allocate, and add the keys they previously * estimated using shm_toc_insert, in each case targeting pcxt->toc. */ - if (planstate->plan->parallel_aware) + switch (nodeTag(planstate)) { - switch (nodeTag(planstate)) - { - case T_SeqScanState: + case T_SeqScanState: + if (planstate->plan->parallel_aware) ExecSeqScanInitializeDSM((SeqScanState *) planstate, d->pcxt); - break; - case T_IndexScanState: + break; + case T_IndexScanState: + if (planstate->plan->parallel_aware) ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt); - break; - case T_IndexOnlyScanState: + break; + case T_IndexOnlyScanState: + if (planstate->plan->parallel_aware) ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate, d->pcxt); - break; - case T_ForeignScanState: + break; + case T_ForeignScanState: + if (planstate->plan->parallel_aware) ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); - break; - case T_CustomScanState: + break; + case T_CustomScanState: + if (planstate->plan->parallel_aware) ExecCustomScanInitializeDSM((CustomScanState *) planstate, d->pcxt); - break; - case T_BitmapHeapScanState: + break; + case T_BitmapHeapScanState: + if (planstate->plan->parallel_aware) ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, d->pcxt); - break; - - default: - break; - } + break; + case T_SortState: + /* even when not parallel-aware */ + ExecSortInitializeDSM((SortState *) planstate, d->pcxt); + default: + break; } return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d); @@ -642,6 +653,13 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, planstate->worker_instrument->num_workers = instrumentation->num_workers; memcpy(&planstate->worker_instrument->instrument, instrument, ibytes); + /* + * Perform any node-type-specific work that needs to be done. Currently, + * only Sort nodes need to do anything here. + */ + if (IsA(planstate, SortState)) + ExecSortRetrieveInstrumentation((SortState *) planstate); + return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, instrumentation); } @@ -801,35 +819,40 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) if (planstate == NULL) return false; - /* Call initializers for parallel-aware plan nodes. */ - if (planstate->plan->parallel_aware) + switch (nodeTag(planstate)) { - switch (nodeTag(planstate)) - { - case T_SeqScanState: + case T_SeqScanState: + if (planstate->plan->parallel_aware) ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); - break; - case T_IndexScanState: + break; + case T_IndexScanState: + if (planstate->plan->parallel_aware) ExecIndexScanInitializeWorker((IndexScanState *) planstate, toc); - break; - case T_IndexOnlyScanState: + break; + case T_IndexOnlyScanState: + if (planstate->plan->parallel_aware) ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate, toc); - break; - case T_ForeignScanState: + break; + case T_ForeignScanState: + if (planstate->plan->parallel_aware) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, toc); - break; - case T_CustomScanState: + break; + case T_CustomScanState: + if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, toc); - break; - case T_BitmapHeapScanState: + break; + case T_BitmapHeapScanState: + if (planstate->plan->parallel_aware) ExecBitmapHeapInitializeWorker( (BitmapHeapScanState *) planstate, toc); - break; - default: - break; - } + break; + case T_SortState: + /* even when not parallel-aware */ + ExecSortInitializeWorker((SortState *) planstate, toc); + default: + break; } return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc); diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c index aae4150e2c8..66ef109c122 100644 --- a/src/backend/executor/nodeSort.c +++ b/src/backend/executor/nodeSort.c @@ -15,6 +15,7 @@ #include "postgres.h" +#include "access/parallel.h" #include "executor/execdebug.h" #include "executor/nodeSort.h" #include "miscadmin.h" @@ -127,6 +128,15 @@ ExecSort(PlanState *pstate) node->sort_Done = true; node->bounded_Done = node->bounded; node->bound_Done = node->bound; + if (node->shared_info && node->am_worker) + { + TuplesortInstrumentation *si; + + Assert(IsParallelWorker()); + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); + si = &node->shared_info->sinstrument[ParallelWorkerNumber]; + tuplesort_get_stats(tuplesortstate, si); + } SO1_printf("ExecSort: %s\n", "sorting done"); } @@ -334,3 +344,90 @@ ExecReScanSort(SortState *node) else tuplesort_rescan((Tuplesortstate *) node->tuplesortstate); } + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecSortEstimate + * + * Estimate space required to propagate sort statistics. + * ---------------------------------------------------------------- + */ +void +ExecSortEstimate(SortState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(TuplesortInstrumentation)); + size = add_size(size, offsetof(SharedSortInfo, sinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecSortInitializeDSM + * + * Initialize DSM space for sort statistics. + * ---------------------------------------------------------------- + */ +void +ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedSortInfo, sinstrument) + + pcxt->nworkers * sizeof(TuplesortInstrumentation); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecSortInitializeWorker + * + * Attach worker to DSM space for sort statistics. + * ---------------------------------------------------------------- + */ +void +ExecSortInitializeWorker(SortState *node, shm_toc *toc) +{ + node->shared_info = + shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, true); + node->am_worker = true; +} + +/* ---------------------------------------------------------------- + * ExecSortRetrieveInstrumentation + * + * Transfer sort statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecSortRetrieveInstrumentation(SortState *node) +{ + Size size; + SharedSortInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedSortInfo, sinstrument) + + node->shared_info->num_workers * sizeof(TuplesortInstrumentation); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} |