aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-11-11 08:57:52 -0500
committerRobert Haas <rhaas@postgresql.org>2015-11-11 08:57:52 -0500
commitf0661c4e8c44c0ec7acd4ea7c82e85b265447398 (patch)
tree0a31416ab40a9be4ab0f43c6ddd73221eed9dec6 /src/backend/executor
parentf764ecd81b2a8a1e9000d43a73ca5eec8e8008bc (diff)
downloadpostgresql-f0661c4e8c44c0ec7acd4ea7c82e85b265447398.tar.gz
postgresql-f0661c4e8c44c0ec7acd4ea7c82e85b265447398.zip
Make sequential scans parallel-aware.
In addition, this path fills in a number of missing bits and pieces in the parallel infrastructure. Paths and plans now have a parallel_aware flag indicating whether whatever parallel-aware logic they have should be engaged. It is believed that we will need this flag for a number of path/plan types, not just sequential scans, which is why the flag is generic rather than part of the SeqScan structures specifically. Also, execParallel.c now gives parallel nodes a chance to initialize their PlanState nodes from the DSM during parallel worker startup. Amit Kapila, with a fair amount of adjustment by me. Review of previous patch versions by Haribabu Kommi and others.
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execAmi.c9
-rw-r--r--src/backend/executor/execParallel.c54
-rw-r--r--src/backend/executor/nodeSeqscan.c136
3 files changed, 158 insertions, 41 deletions
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 163650cecd1..b969fc08037 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -439,6 +439,15 @@ ExecSupportsBackwardScan(Plan *node)
if (node == NULL)
return false;
+ /*
+ * Parallel-aware nodes return a subset of the tuples in each worker,
+ * and in general we can't expect to have enough bookkeeping state to
+ * know which ones we returned in this worker as opposed to some other
+ * worker.
+ */
+ if (node->parallel_aware)
+ return false;
+
switch (nodeTag(node))
{
case T_Result:
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 99a9de3cdc3..eae13c56477 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
@@ -167,10 +168,16 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* Count this node. */
e->nnodes++;
- /*
- * XXX. Call estimators for parallel-aware nodes here, when we have
- * some.
- */
+ /* Call estimators for parallel-aware nodes. */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ ExecSeqScanEstimate((SeqScanState *) planstate,
+ e->pcxt);
+ break;
+ default:
+ break;
+ }
return planstate_tree_walker(planstate, ExecParallelEstimate, e);
}
@@ -205,10 +212,16 @@ ExecParallelInitializeDSM(PlanState *planstate,
/* Count this node. */
d->nnodes++;
- /*
- * XXX. Call initializers for parallel-aware plan nodes, when we have
- * some.
- */
+ /* Call initializers for parallel-aware plan nodes. */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ ExecSeqScanInitializeDSM((SeqScanState *) planstate,
+ d->pcxt);
+ break;
+ default:
+ break;
+ }
return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
}
@@ -575,6 +588,30 @@ ExecParallelReportInstrumentation(PlanState *planstate,
}
/*
+ * Initialize the PlanState and its descendents with the information
+ * retrieved from shared memory. This has to be done once the PlanState
+ * is allocated and initialized by executor; that is, after ExecutorStart().
+ */
+static bool
+ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
+{
+ if (planstate == NULL)
+ return false;
+
+ /* Call initializers for parallel-aware plan nodes. */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc);
+ break;
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(planstate, ExecParallelInitializeWorker, toc);
+}
+
+/*
* Main entrypoint for parallel query worker processes.
*
* We reach this function from ParallelMain, so the setup necessary to create
@@ -610,6 +647,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Start up the executor, have it run the plan, and then shut it down. */
ExecutorStart(queryDesc, 0);
+ ExecParallelInitializeWorker(queryDesc->planstate, toc);
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 3cb81fccc30..b858f2f3af8 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -19,6 +19,10 @@
* ExecInitSeqScan creates and initializes a seqscan node.
* ExecEndSeqScan releases any storage allocated.
* ExecReScanSeqScan rescans the relation
+ *
+ * ExecSeqScanEstimate estimates DSM space needed for parallel scan
+ * ExecSeqScanInitializeDSM initialize DSM for parallel scan
+ * ExecSeqScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
@@ -53,10 +57,22 @@ SeqNext(SeqScanState *node)
/*
* get information from the estate and scan state
*/
- scandesc = node->ss_currentScanDesc;
- estate = node->ps.state;
+ scandesc = node->ss.ss_currentScanDesc;
+ estate = node->ss.ps.state;
direction = estate->es_direction;
- slot = node->ss_ScanTupleSlot;
+ slot = node->ss.ss_ScanTupleSlot;
+
+ if (scandesc == NULL)
+ {
+ /*
+ * We reach here if the scan is not parallel, or if we're executing
+ * a scan that was intended to be parallel serially.
+ */
+ scandesc = heap_beginscan(node->ss.ss_currentRelation,
+ estate->es_snapshot,
+ 0, NULL);
+ node->ss.ss_currentScanDesc = scandesc;
+ }
/*
* get the next tuple from the table
@@ -123,27 +139,19 @@ static void
InitScanRelation(SeqScanState *node, EState *estate, int eflags)
{
Relation currentRelation;
- HeapScanDesc currentScanDesc;
/*
* get the relation object id from the relid'th entry in the range table,
* open that relation and acquire appropriate lock on it.
*/
currentRelation = ExecOpenScanRelation(estate,
- ((SeqScan *) node->ps.plan)->scanrelid,
+ ((SeqScan *) node->ss.ps.plan)->scanrelid,
eflags);
- /* initialize a heapscan */
- currentScanDesc = heap_beginscan(currentRelation,
- estate->es_snapshot,
- 0,
- NULL);
-
- node->ss_currentRelation = currentRelation;
- node->ss_currentScanDesc = currentScanDesc;
+ node->ss.ss_currentRelation = currentRelation;
/* and report the scan tuple slot's rowtype */
- ExecAssignScanType(node, RelationGetDescr(currentRelation));
+ ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation));
}
@@ -167,44 +175,44 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
* create state structure
*/
scanstate = makeNode(SeqScanState);
- scanstate->ps.plan = (Plan *) node;
- scanstate->ps.state = estate;
+ scanstate->ss.ps.plan = (Plan *) node;
+ scanstate->ss.ps.state = estate;
/*
* Miscellaneous initialization
*
* create expression context for node
*/
- ExecAssignExprContext(estate, &scanstate->ps);
+ ExecAssignExprContext(estate, &scanstate->ss.ps);
/*
* initialize child expressions
*/
- scanstate->ps.targetlist = (List *)
+ scanstate->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *) node->plan.targetlist,
(PlanState *) scanstate);
- scanstate->ps.qual = (List *)
+ scanstate->ss.ps.qual = (List *)
ExecInitExpr((Expr *) node->plan.qual,
(PlanState *) scanstate);
/*
* tuple table initialization
*/
- ExecInitResultTupleSlot(estate, &scanstate->ps);
- ExecInitScanTupleSlot(estate, scanstate);
+ ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
+ ExecInitScanTupleSlot(estate, &scanstate->ss);
/*
* initialize scan relation
*/
InitScanRelation(scanstate, estate, eflags);
- scanstate->ps.ps_TupFromTlist = false;
+ scanstate->ss.ps.ps_TupFromTlist = false;
/*
* Initialize result tuple type and projection info.
*/
- ExecAssignResultTypeFromTL(&scanstate->ps);
- ExecAssignScanProjectionInfo(scanstate);
+ ExecAssignResultTypeFromTL(&scanstate->ss.ps);
+ ExecAssignScanProjectionInfo(&scanstate->ss);
return scanstate;
}
@@ -224,24 +232,25 @@ ExecEndSeqScan(SeqScanState *node)
/*
* get information from node
*/
- relation = node->ss_currentRelation;
- scanDesc = node->ss_currentScanDesc;
+ relation = node->ss.ss_currentRelation;
+ scanDesc = node->ss.ss_currentScanDesc;
/*
* Free the exprcontext
*/
- ExecFreeExprContext(&node->ps);
+ ExecFreeExprContext(&node->ss.ps);
/*
* clean out the tuple table
*/
- ExecClearTuple(node->ps.ps_ResultTupleSlot);
- ExecClearTuple(node->ss_ScanTupleSlot);
+ ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+ ExecClearTuple(node->ss.ss_ScanTupleSlot);
/*
* close heap scan
*/
- heap_endscan(scanDesc);
+ if (scanDesc != NULL)
+ heap_endscan(scanDesc);
/*
* close the heap relation.
@@ -265,10 +274,71 @@ ExecReScanSeqScan(SeqScanState *node)
{
HeapScanDesc scan;
- scan = node->ss_currentScanDesc;
+ scan = node->ss.ss_currentScanDesc;
- heap_rescan(scan, /* scan desc */
- NULL); /* new scan keys */
+ if (scan != NULL)
+ heap_rescan(scan, /* scan desc */
+ NULL); /* new scan keys */
ExecScanReScan((ScanState *) node);
}
+
+/* ----------------------------------------------------------------
+ * Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecSeqScanEstimate
+ *
+ * estimates the space required to serialize seqscan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanEstimate(SeqScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+
+ node->pscan_len = heap_parallelscan_estimate(estate->es_snapshot);
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecSeqScanInitializeDSM
+ *
+ * Set up a parallel heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanInitializeDSM(SeqScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+ ParallelHeapScanDesc pscan;
+
+ pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
+ heap_parallelscan_initialize(pscan,
+ node->ss.ss_currentRelation,
+ estate->es_snapshot);
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
+ node->ss.ss_currentScanDesc =
+ heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+}
+
+/* ----------------------------------------------------------------
+ * ExecSeqScanInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc)
+{
+ ParallelHeapScanDesc pscan;
+
+ pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+ node->ss.ss_currentScanDesc =
+ heap_beginscan_parallel(node->ss.ss_currentRelation, pscan);
+}