diff options
author | Robert Haas <rhaas@postgresql.org> | 2015-11-11 08:57:52 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2015-11-11 08:57:52 -0500 |
commit | f0661c4e8c44c0ec7acd4ea7c82e85b265447398 (patch) | |
tree | 0a31416ab40a9be4ab0f43c6ddd73221eed9dec6 /src/backend/executor | |
parent | f764ecd81b2a8a1e9000d43a73ca5eec8e8008bc (diff) | |
download | postgresql-f0661c4e8c44c0ec7acd4ea7c82e85b265447398.tar.gz postgresql-f0661c4e8c44c0ec7acd4ea7c82e85b265447398.zip |
Make sequential scans parallel-aware.
In addition, this path fills in a number of missing bits and pieces in
the parallel infrastructure. Paths and plans now have a parallel_aware
flag indicating whether whatever parallel-aware logic they have should
be engaged. It is believed that we will need this flag for a number of
path/plan types, not just sequential scans, which is why the flag is
generic rather than part of the SeqScan structures specifically.
Also, execParallel.c now gives parallel nodes a chance to initialize
their PlanState nodes from the DSM during parallel worker startup.
Amit Kapila, with a fair amount of adjustment by me. Review of previous
patch versions by Haribabu Kommi and others.
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execAmi.c | 9 | ||||
-rw-r--r-- | src/backend/executor/execParallel.c | 54 | ||||
-rw-r--r-- | src/backend/executor/nodeSeqscan.c | 136 |
3 files changed, 158 insertions, 41 deletions
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 163650cecd1..b969fc08037 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node) if (node == NULL) return false; + /* + * Parallel-aware nodes return a subset of the tuples in each worker, + * and in general we can't expect to have enough bookkeeping state to + * know which ones we returned in this worker as opposed to some other + * worker. + */ + if (node->parallel_aware) + return false; + switch (nodeTag(node)) { case T_Result: diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 99a9de3cdc3..eae13c56477 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeSeqscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" @@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) /* Count this node. */ e->nnodes++; - /* - * XXX. Call estimators for parallel-aware nodes here, when we have - * some. - */ + /* Call estimators for parallel-aware nodes. */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanEstimate((SeqScanState *) planstate, + e->pcxt); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelEstimate, e); } @@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate, /* Count this node. */ d->nnodes++; - /* - * XXX. Call initializers for parallel-aware plan nodes, when we have - * some. - */ + /* Call initializers for parallel-aware plan nodes. */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanInitializeDSM((SeqScanState *) planstate, + d->pcxt); + break; + default: + break; + } return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d); } @@ -575,6 +588,30 @@ ExecParallelReportInstrumentation(PlanState *planstate, } /* + * Initialize the PlanState and its descendents with the information + * retrieved from shared memory. This has to be done once the PlanState + * is allocated and initialized by executor; that is, after ExecutorStart(). + */ +static bool +ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) +{ + if (planstate == NULL) + return false; + + /* Call initializers for parallel-aware plan nodes. */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); + break; + default: + break; + } + + return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc); +} + +/* * Main entrypoint for parallel query worker processes. * * We reach this function from ParallelMain, so the setup necessary to create @@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Start up the executor, have it run the plan, and then shut it down. */ ExecutorStart(queryDesc, 0); + ExecParallelInitializeWorker(queryDesc->planstate, toc); ExecutorRun(queryDesc, ForwardScanDirection, 0L); ExecutorFinish(queryDesc); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 3cb81fccc30..b858f2f3af8 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -19,6 +19,10 @@ * ExecInitSeqScan creates and initializes a seqscan node. * ExecEndSeqScan releases any storage allocated. * ExecReScanSeqScan rescans the relation + * + * ExecSeqScanEstimate estimates DSM space needed for parallel scan + * ExecSeqScanInitializeDSM initialize DSM for parallel scan + * ExecSeqScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" @@ -53,10 +57,22 @@ SeqNext(SeqScanState *node) /* * get information from the estate and scan state */ - scandesc = node->ss_currentScanDesc; - estate = node->ps.state; + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; direction = estate->es_direction; - slot = node->ss_ScanTupleSlot; + slot = node->ss.ss_ScanTupleSlot; + + if (scandesc == NULL) + { + /* + * We reach here if the scan is not parallel, or if we're executing + * a scan that was intended to be parallel serially. + */ + scandesc = heap_beginscan(node->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL); + node->ss.ss_currentScanDesc = scandesc; + } /* * get the next tuple from the table @@ -123,27 +139,19 @@ static void InitScanRelation(SeqScanState *node, EState *estate, int eflags) { Relation currentRelation; - HeapScanDesc currentScanDesc; /* * get the relation object id from the relid'th entry in the range table, * open that relation and acquire appropriate lock on it. */ currentRelation = ExecOpenScanRelation(estate, - ((SeqScan *) node->ps.plan)->scanrelid, + ((SeqScan *) node->ss.ps.plan)->scanrelid, eflags); - /* initialize a heapscan */ - currentScanDesc = heap_beginscan(currentRelation, - estate->es_snapshot, - 0, - NULL); - - node->ss_currentRelation = currentRelation; - node->ss_currentScanDesc = currentScanDesc; + node->ss.ss_currentRelation = currentRelation; /* and report the scan tuple slot's rowtype */ - ExecAssignScanType(node, RelationGetDescr(currentRelation)); + ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); } @@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) * create state structure */ scanstate = makeNode(SeqScanState); - scanstate->ps.plan = (Plan *) node; - scanstate->ps.state = estate; + scanstate->ss.ps.plan = (Plan *) node; + scanstate->ss.ps.state = estate; /* * Miscellaneous initialization * * create expression context for node */ - ExecAssignExprContext(estate, &scanstate->ps); + ExecAssignExprContext(estate, &scanstate->ss.ps); /* * initialize child expressions */ - scanstate->ps.targetlist = (List *) + scanstate->ss.ps.targetlist = (List *) ExecInitExpr((Expr *) node->plan.targetlist, (PlanState *) scanstate); - scanstate->ps.qual = (List *) + scanstate->ss.ps.qual = (List *) ExecInitExpr((Expr *) node->plan.qual, (PlanState *) scanstate); /* * tuple table initialization */ - ExecInitResultTupleSlot(estate, &scanstate->ps); - ExecInitScanTupleSlot(estate, scanstate); + ExecInitResultTupleSlot(estate, &scanstate->ss.ps); + ExecInitScanTupleSlot(estate, &scanstate->ss); /* * initialize scan relation */ InitScanRelation(scanstate, estate, eflags); - scanstate->ps.ps_TupFromTlist = false; + scanstate->ss.ps.ps_TupFromTlist = false; /* * Initialize result tuple type and projection info. */ - ExecAssignResultTypeFromTL(&scanstate->ps); - ExecAssignScanProjectionInfo(scanstate); + ExecAssignResultTypeFromTL(&scanstate->ss.ps); + ExecAssignScanProjectionInfo(&scanstate->ss); return scanstate; } @@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node) /* * get information from node */ - relation = node->ss_currentRelation; - scanDesc = node->ss_currentScanDesc; + relation = node->ss.ss_currentRelation; + scanDesc = node->ss.ss_currentScanDesc; /* * Free the exprcontext */ - ExecFreeExprContext(&node->ps); + ExecFreeExprContext(&node->ss.ps); /* * clean out the tuple table */ - ExecClearTuple(node->ps.ps_ResultTupleSlot); - ExecClearTuple(node->ss_ScanTupleSlot); + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); /* * close heap scan */ - heap_endscan(scanDesc); + if (scanDesc != NULL) + heap_endscan(scanDesc); /* * close the heap relation. @@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node) { HeapScanDesc scan; - scan = node->ss_currentScanDesc; + scan = node->ss.ss_currentScanDesc; - heap_rescan(scan, /* scan desc */ - NULL); /* new scan keys */ + if (scan != NULL) + heap_rescan(scan, /* scan desc */ + NULL); /* new scan keys */ ExecScanReScan((ScanState *) node); } + +/* ---------------------------------------------------------------- + * Parallel Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecSeqScanEstimate + * + * estimates the space required to serialize seqscan node. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanEstimate(SeqScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + + node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecSeqScanInitializeDSM + * + * Set up a parallel heap scan descriptor. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanInitializeDSM(SeqScanState *node, + ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + ParallelHeapScanDesc pscan; + + pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); + heap_parallelscan_initialize(pscan, + node->ss.ss_currentRelation, + estate->es_snapshot); + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); + node->ss.ss_currentScanDesc = + heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); +} + +/* ---------------------------------------------------------------- + * ExecSeqScanInitializeWorker + * + * Copy relevant information from TOC into planstate. + * ---------------------------------------------------------------- + */ +void +ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) +{ + ParallelHeapScanDesc pscan; + + pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + node->ss.ss_currentScanDesc = + heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); +} |