diff options
Diffstat (limited to 'src')
25 files changed, 663 insertions, 8 deletions
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index f0d9e94eed2..7fb8a1458df 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -853,6 +853,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_SampleScan: pname = sname = "Sample Scan"; break; + case T_Gather: + pname = sname = "Gather"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1276,6 +1279,22 @@ ExplainNode(PlanState *planstate, List *ancestors, show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); break; + case T_Gather: + { + Gather *gather = (Gather *) plan; + + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Number of Workers", + gather->num_workers, es); + if (gather->single_copy) + ExplainPropertyText("Single Copy", + gather->single_copy ? "true" : "false", + es); + } + break; case T_FunctionScan: if (es->verbose) { diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f5e1e1aefcd..51edd4c5e70 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ execScan.o execTuples.o \ execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ nodeBitmapAnd.o nodeBitmapOr.o \ - nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \ - nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ + nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \ + nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ nodeLimit.o nodeLockRows.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 93e1e9a691c..163650cecd1 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -24,6 +24,7 @@ #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeFunctionscan.h" +#include "executor/nodeGather.h" #include "executor/nodeGroup.h" #include "executor/nodeGroup.h" #include "executor/nodeHash.h" @@ -160,6 +161,10 @@ ExecReScan(PlanState *node) ExecReScanSampleScan((SampleScanState *) node); break; + case T_GatherState: + ExecReScanGather((GatherState *) node); + break; + case T_IndexScanState: ExecReScanIndexScan((IndexScanState *) node); break; @@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node) /* Simplify life for tablesample methods by disallowing this */ return false; + case T_Gather: + return false; + case T_IndexScan: return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) && TargetListSupportsBackwardScan(node->targetlist); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 85ff46b8026..37b7bbd413b 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -347,6 +347,9 @@ standard_ExecutorRun(QueryDesc *queryDesc, direction, dest); + /* Allow nodes to release or shut down resources. */ + (void) ExecShutdownNode(queryDesc->planstate); + /* * shutdown tuple receiver, if we started it */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a409a9a571f..e6930c1d51c 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -71,7 +71,7 @@ typedef struct ExecParallelInitializeDSMContext } ExecParallelInitializeDSMContext; /* Helper functions that run in the parallel leader. */ -static char *ExecSerializePlan(Plan *plan, List *rangetable); +static char *ExecSerializePlan(Plan *plan, EState *estate); static bool ExecParallelEstimate(PlanState *node, ExecParallelEstimateContext *e); static bool ExecParallelInitializeDSM(PlanState *node, @@ -88,7 +88,7 @@ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); * Create a serialized representation of the plan to be sent to each worker. */ static char * -ExecSerializePlan(Plan *plan, List *rangetable) +ExecSerializePlan(Plan *plan, EState *estate) { PlannedStmt *pstmt; ListCell *tlist; @@ -125,13 +125,13 @@ ExecSerializePlan(Plan *plan, List *rangetable) pstmt->canSetTag = 1; pstmt->transientPlan = 0; pstmt->planTree = plan; - pstmt->rtable = rangetable; + pstmt->rtable = estate->es_range_table; pstmt->resultRelations = NIL; pstmt->utilityStmt = NULL; pstmt->subplans = NIL; pstmt->rewindPlanIDs = NULL; pstmt->rowMarks = NIL; - pstmt->nParamExec = 0; + pstmt->nParamExec = estate->es_plannedstmt->nParamExec; pstmt->relationOids = NIL; pstmt->invalItems = NIL; /* workers can't replan anyway... */ pstmt->hasRowSecurity = false; @@ -271,7 +271,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) pei->planstate = planstate; /* Fix up and serialize plan to be sent to workers. */ - pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table); + pstmt_data = ExecSerializePlan(planstate->plan, estate); /* Create a parallel context. */ pcxt = CreateParallelContext(ParallelQueryMain, nworkers); @@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorStart(queryDesc, 0); ExecutorRun(queryDesc, ForwardScanDirection, 0L); ExecutorFinish(queryDesc); - ExecutorEnd(queryDesc); /* Report buffer usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); @@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecParallelReportInstrumentation(queryDesc->planstate, instrumentation); + /* Must do this after capturing instrumentation. */ + ExecutorEnd(queryDesc); + /* Cleanup. */ FreeQueryDesc(queryDesc); (*receiver->rDestroy) (receiver); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 03c2febc3e1..5bc1d489421 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -100,6 +100,7 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodeGather.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSamplescan.h" @@ -113,6 +114,7 @@ #include "executor/nodeValuesscan.h" #include "executor/nodeWindowAgg.h" #include "executor/nodeWorktablescan.h" +#include "nodes/nodeFuncs.h" #include "miscadmin.h" @@ -307,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_Gather: + result = (PlanState *) ExecInitGather((Gather *) node, + estate, eflags); + break; + case T_Hash: result = (PlanState *) ExecInitHash((Hash *) node, estate, eflags); @@ -504,6 +511,10 @@ ExecProcNode(PlanState *node) result = ExecUnique((UniqueState *) node); break; + case T_GatherState: + result = ExecGather((GatherState *) node); + break; + case T_HashState: result = ExecHash((HashState *) node); break; @@ -658,6 +669,10 @@ ExecEndNode(PlanState *node) ExecEndSampleScan((SampleScanState *) node); break; + case T_GatherState: + ExecEndGather((GatherState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; @@ -769,3 +784,34 @@ ExecEndNode(PlanState *node) break; } } + +/* + * ExecShutdownNode + * + * Give execution nodes a chance to stop asynchronous resource consumption + * and release any resources still held. Currently, this is only used for + * parallel query, but we might want to extend it to other cases also (e.g. + * FDW). We might also want to call it sooner, as soon as it's evident that + * no more rows will be needed (e.g. when a Limit is filled) rather than only + * at the end of ExecutorRun. + */ +bool +ExecShutdownNode(PlanState *node) +{ + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_GatherState: + { + ExecShutdownGather((GatherState *) node); + return true; + } + break; + default: + break; + } + + return planstate_tree_walker(node, ExecShutdownNode, NULL); +} diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c new file mode 100644 index 00000000000..735dbaa2226 --- /dev/null +++ b/src/backend/executor/nodeGather.c @@ -0,0 +1,299 @@ +/*------------------------------------------------------------------------- + * + * nodeGather.c + * Support routines for scanning a plan via multiple workers. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeGather.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/relscan.h" +#include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/nodeGather.h" +#include "executor/nodeSubplan.h" +#include "executor/tqueue.h" +#include "utils/rel.h" + + +static TupleTableSlot *gather_getnext(GatherState *gatherstate); + + +/* ---------------------------------------------------------------- + * ExecInitGather + * ---------------------------------------------------------------- + */ +GatherState * +ExecInitGather(Gather *node, EState *estate, int eflags) +{ + GatherState *gatherstate; + + /* Gather node doesn't have innerPlan node. */ + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + gatherstate = makeNode(GatherState); + gatherstate->ps.plan = (Plan *) node; + gatherstate->ps.state = estate; + gatherstate->need_to_scan_workers = false; + gatherstate->need_to_scan_locally = !node->single_copy; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &gatherstate->ps); + + /* + * initialize child expressions + */ + gatherstate->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) gatherstate); + gatherstate->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) gatherstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &gatherstate->ps); + + /* + * now initialize outer plan + */ + outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags); + + + gatherstate->ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&gatherstate->ps); + ExecAssignProjectionInfo(&gatherstate->ps, NULL); + + return gatherstate; +} + +/* ---------------------------------------------------------------- + * ExecGather(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecGather(GatherState *node) +{ + int i; + TupleTableSlot *slot; + + /* + * Initialize the parallel context and workers on first execution. We do + * this on first execution rather than during node initialization, as it + * needs to allocate large dynamic segement, so it is better to do if it + * is really needed. + */ + if (!node->pei) + { + EState *estate = node->ps.state; + + /* Initialize the workers required to execute Gather node. */ + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + ((Gather *) (node->ps.plan))->num_workers); + + /* + * Register backend workers. If the required number of workers are not + * available then we perform the scan with available workers and if + * there are no more workers available, then the Gather node will just + * scan locally. + */ + LaunchParallelWorkers(node->pei->pcxt); + + node->funnel = CreateTupleQueueFunnel(); + + for (i = 0; i < node->pei->pcxt->nworkers; ++i) + { + if (node->pei->pcxt->worker[i].bgwhandle) + { + shm_mq_set_handle(node->pei->tqueue[i], + node->pei->pcxt->worker[i].bgwhandle); + RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]); + node->need_to_scan_workers = true; + } + } + + /* If no workers are available, we must always scan locally. */ + if (!node->need_to_scan_workers) + node->need_to_scan_locally = true; + } + + slot = gather_getnext(node); + + if (TupIsNull(slot)) + { + /* + * Destroy the parallel context once we complete fetching all the + * tuples. Otherwise, the DSM and workers will stick around for the + * lifetime of the entire statement. + */ + ExecShutdownGather(node); + } + return slot; +} + +/* ---------------------------------------------------------------- + * ExecEndGather + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndGather(GatherState *node) +{ + ExecShutdownGather(node); + ExecFreeExprContext(&node->ps); + ExecClearTuple(node->ps.ps_ResultTupleSlot); + ExecEndNode(outerPlanState(node)); +} + +/* + * gather_getnext + * + * Get the next tuple from shared memory queue. This function + * is reponsible for fetching tuples from all the queues associated + * with worker backends used in Gather node execution and if there is + * no data available from queues or no worker is available, it does + * fetch the data from local node. + */ +TupleTableSlot * +gather_getnext(GatherState *gatherstate) +{ + PlanState *outerPlan; + TupleTableSlot *outerTupleSlot; + TupleTableSlot *slot; + HeapTuple tup; + + /* + * We can use projection info of Gather for the tuples received from + * worker backends as currently for all cases worker backends sends the + * projected tuple as required by Gather node. + */ + slot = gatherstate->ps.ps_ProjInfo->pi_slot; + + while (gatherstate->need_to_scan_workers || + gatherstate->need_to_scan_locally) + { + if (gatherstate->need_to_scan_workers) + { + bool done = false; + + /* wait only if local scan is done */ + tup = TupleQueueFunnelNext(gatherstate->funnel, + gatherstate->need_to_scan_locally, + &done); + if (done) + gatherstate->need_to_scan_workers = false; + + if (HeapTupleIsValid(tup)) + { + ExecStoreTuple(tup, /* tuple to store */ + slot, /* slot to store in */ + InvalidBuffer, /* buffer associated with this + * tuple */ + true); /* pfree this pointer if not from heap */ + + return slot; + } + } + + if (gatherstate->need_to_scan_locally) + { + outerPlan = outerPlanState(gatherstate); + + outerTupleSlot = ExecProcNode(outerPlan); + + if (!TupIsNull(outerTupleSlot)) + return outerTupleSlot; + + gatherstate->need_to_scan_locally = false; + } + } + + return ExecClearTuple(slot); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGather + * + * Destroy the setup for parallel workers. Collect all the + * stats after workers are stopped, else some work done by + * workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGather(GatherState *node) +{ + Gather *gather; + + if (node->pei == NULL || node->pei->pcxt == NULL) + return; + + /* + * Ensure all workers have finished before destroying the parallel context + * to ensure a clean exit. + */ + if (node->funnel) + { + DestroyTupleQueueFunnel(node->funnel); + node->funnel = NULL; + } + + ExecParallelFinish(node->pei); + + /* destroy parallel context. */ + DestroyParallelContext(node->pei->pcxt); + node->pei->pcxt = NULL; + + gather = (Gather *) node->ps.plan; + node->need_to_scan_locally = !gather->single_copy; + node->need_to_scan_workers = false; +} + +/* ---------------------------------------------------------------- + * Join Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecReScanGather + * + * Re-initialize the workers and rescans a relation via them. + * ---------------------------------------------------------------- + */ +void +ExecReScanGather(GatherState *node) +{ + /* + * Re-initialize the parallel context and workers to perform rescan of + * relation. We want to gracefully shutdown all the workers so that they + * should be able to propagate any error or other information to master + * backend before dying. + */ + ExecShutdownGather(node); + + ExecReScan(node->ps.lefttree); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 4b4ddec4c21..88dc0858707 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -314,6 +314,28 @@ _copyBitmapOr(const BitmapOr *from) return newnode; } +/* + * _copyGather + */ +static Gather * +_copyGather(const Gather *from) +{ + Gather *newnode = makeNode(Gather); + + /* + * copy node superclass fields + */ + CopyPlanFields((const Plan *) from, (Plan *) newnode); + + /* + * copy remainder of node + */ + COPY_SCALAR_FIELD(num_workers); + COPY_SCALAR_FIELD(single_copy); + + return newnode; +} + /* * CopyScanFields @@ -4235,6 +4257,9 @@ copyObject(const void *from) case T_Scan: retval = _copyScan(from); break; + case T_Gather: + retval = _copyGather(from); + break; case T_SeqScan: retval = _copySeqScan(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ee9c360345f..4645ecb804f 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -433,6 +433,17 @@ _outBitmapOr(StringInfo str, const BitmapOr *node) } static void +_outGather(StringInfo str, const Gather *node) +{ + WRITE_NODE_TYPE("GATHER"); + + _outPlanInfo(str, (const Plan *) node); + + WRITE_UINT_FIELD(num_workers); + WRITE_UINT_FIELD(single_copy); +} + +static void _outScan(StringInfo str, const Scan *node) { WRITE_NODE_TYPE("SCAN"); @@ -3000,6 +3011,9 @@ _outNode(StringInfo str, const void *obj) case T_BitmapOr: _outBitmapOr(str, obj); break; + case T_Gather: + _outGather(str, obj); + break; case T_Scan: _outScan(str, obj); break; diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index d107d76a3c0..1b61fd9d4ea 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -11,6 +11,8 @@ * cpu_tuple_cost Cost of typical CPU time to process a tuple * cpu_index_tuple_cost Cost of typical CPU time to process an index tuple * cpu_operator_cost Cost of CPU time to execute an operator or function + * parallel_tuple_cost Cost of CPU time to pass a tuple from worker to master backend + * parallel_setup_cost Cost of setting up shared memory for parallelism * * We expect that the kernel will typically do some amount of read-ahead * optimization; this in conjunction with seek costs means that seq_page_cost @@ -102,11 +104,15 @@ double random_page_cost = DEFAULT_RANDOM_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; +double parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST; +double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST; int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE; Cost disable_cost = 1.0e10; +int max_parallel_degree = 0; + bool enable_seqscan = true; bool enable_indexscan = true; bool enable_indexonlyscan = true; @@ -290,6 +296,38 @@ cost_samplescan(Path *path, PlannerInfo *root, } /* + * cost_gather + * Determines and returns the cost of gather path. + * + * 'rel' is the relation to be operated upon + * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + */ +void +cost_gather(GatherPath *path, PlannerInfo *root, + RelOptInfo *rel, ParamPathInfo *param_info) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + + /* Mark the path with the correct row estimate */ + if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = rel->rows; + + startup_cost = path->subpath->startup_cost; + + run_cost = path->subpath->total_cost - path->subpath->startup_cost; + + /* Parallel setup and communication cost. */ + startup_cost += parallel_setup_cost; + run_cost += parallel_tuple_cost * rel->tuples; + + path->path.startup_cost = startup_cost; + path->path.total_cost = (startup_cost + run_cost); +} + +/* * cost_index * Determines and returns the cost of scanning a relation using an index. * diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 404c6f593d7..0ee7392bcce 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -60,6 +60,8 @@ static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Gather *create_gather_plan(PlannerInfo *root, + GatherPath *best_path); static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path, List *tlist, List *scan_clauses, bool indexonly); static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root, @@ -104,6 +106,8 @@ static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid, TableSampleClause *tsc); +static Gather *make_gather(List *qptlist, List *qpqual, + int nworkers, bool single_copy, Plan *subplan); static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid, Oid indexid, List *indexqual, List *indexqualorig, List *indexorderby, List *indexorderbyorig, @@ -273,6 +277,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path) plan = create_unique_plan(root, (UniquePath *) best_path); break; + case T_Gather: + plan = (Plan *) create_gather_plan(root, + (GatherPath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->pathtype); @@ -1101,6 +1109,34 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path) return plan; } +/* + * create_gather_plan + * + * Create a Gather plan for 'best_path' and (recursively) plans + * for its subpaths. + */ +static Gather * +create_gather_plan(PlannerInfo *root, GatherPath *best_path) +{ + Gather *gather_plan; + Plan *subplan; + + subplan = create_plan_recurse(root, best_path->subpath); + + gather_plan = make_gather(subplan->targetlist, + NIL, + best_path->num_workers, + best_path->single_copy, + subplan); + + copy_path_costsize(&gather_plan->plan, &best_path->path); + + /* use parallel mode for parallel plans. */ + root->glob->parallelModeNeeded = true; + + return gather_plan; +} + /***************************************************************************** * @@ -4735,6 +4771,27 @@ make_unique(Plan *lefttree, List *distinctList) return node; } +static Gather * +make_gather(List *qptlist, + List *qpqual, + int nworkers, + bool single_copy, + Plan *subplan) +{ + Gather *node = makeNode(Gather); + Plan *plan = &node->plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = subplan; + plan->righttree = NULL; + node->num_workers = nworkers; + node->single_copy = single_copy; + + return node; +} + /* * distinctList is a list of SortGroupClauses, identifying the targetlist * items that should be considered by the SetOp filter. The input path must diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 3c8169725a0..b1cede2ef0d 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -607,6 +607,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_Sort: case T_Unique: case T_SetOp: + case T_Gather: /* * These plan types don't actually bother to evaluate their diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index d0bc412c833..6b32f85d6c0 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2584,6 +2584,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, case T_Material: case T_Sort: case T_Unique: + case T_Gather: case T_SetOp: case T_Group: break; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 4336ca1b782..1895a6894a3 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1308,6 +1308,32 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, } /* + * create_gather_path + * + * Creates a path corresponding to a gather scan, returning the + * pathnode. + */ +GatherPath * +create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, + Relids required_outer, int nworkers) +{ + GatherPath *pathnode = makeNode(GatherPath); + + pathnode->path.pathtype = T_Gather; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + required_outer); + pathnode->path.pathkeys = NIL; /* Gather has unordered result */ + + pathnode->subpath = subpath; + pathnode->num_workers = nworkers; + + cost_gather(pathnode, root, rel, pathnode->path.param_info); + + return pathnode; +} + +/* * translate_sub_tlist - get subquery column numbers represented by tlist * * The given targetlist usually contains only Vars referencing the given relid. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 17053aff68b..7684bff79b1 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2535,6 +2535,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"max_parallel_degree", PGC_SUSET, RESOURCES_ASYNCHRONOUS, + gettext_noop("Sets the maximum number of parallel processes per executor node."), + NULL + }, + &max_parallel_degree, + 0, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, @@ -2711,6 +2721,26 @@ static struct config_real ConfigureNamesReal[] = DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX, NULL, NULL, NULL }, + { + {"parallel_tuple_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "passing each tuple (row) from worker to master backend."), + NULL + }, + ¶llel_tuple_cost, + DEFAULT_PARALLEL_TUPLE_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, + { + {"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "starting up worker processes for parallel query."), + NULL + }, + ¶llel_setup_cost, + DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, { {"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 8c65287e309..b2adda95958 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -164,6 +164,7 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 +#max_parallel_degree = 0 # max number of worker processes per node #------------------------------------------------------------------------------ @@ -290,6 +291,8 @@ #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above +#parallel_tuple_cost = 0.1 # same scale as above +#parallel_setup_cost = 1000.0 # same scale as above #effective_cache_size = 4GB # - Genetic Query Optimizer - diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 226f905c3cc..4f77692aa32 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -225,6 +225,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern TupleTableSlot *ExecProcNode(PlanState *node); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); +extern bool ExecShutdownNode(PlanState *node); /* * prototypes from functions in execQual.c diff --git a/src/include/executor/nodeGather.h b/src/include/executor/nodeGather.h new file mode 100644 index 00000000000..9e5d8fc1530 --- /dev/null +++ b/src/include/executor/nodeGather.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * nodeGather.h + * prototypes for nodeGather.c + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeGather.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEGATHER_H +#define NODEGATHER_H + +#include "nodes/execnodes.h" + +extern GatherState *ExecInitGather(Gather *node, EState *estate, int eflags); +extern TupleTableSlot *ExecGather(GatherState *node); +extern void ExecEndGather(GatherState *node); +extern void ExecShutdownGather(GatherState *node); +extern void ExecReScanGather(GatherState *node); + +#endif /* NODEGATHER_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4ae2f3e067b..b6895f94c39 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1951,6 +1951,22 @@ typedef struct UniqueState } UniqueState; /* ---------------- + * GatherState information + * + * Gather nodes launch 1 or more parallel workers, run a subplan + * in those workers, and collect the results. + * ---------------- + */ +typedef struct GatherState +{ + PlanState ps; /* its first field is NodeTag */ + struct ParallelExecutorInfo *pei; + struct TupleQueueFunnel *funnel; + bool need_to_scan_workers; + bool need_to_scan_locally; +} GatherState; + +/* ---------------- * HashState information * ---------------- */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 274480e2c92..94bdb7c9af5 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -74,6 +74,7 @@ typedef enum NodeTag T_Agg, T_WindowAgg, T_Unique, + T_Gather, T_Hash, T_SetOp, T_LockRows, @@ -121,6 +122,7 @@ typedef enum NodeTag T_AggState, T_WindowAggState, T_UniqueState, + T_GatherState, T_HashState, T_SetOpState, T_LockRowsState, @@ -238,6 +240,7 @@ typedef enum NodeTag T_ResultPath, T_MaterialPath, T_UniquePath, + T_GatherPath, T_EquivalenceClass, T_EquivalenceMember, T_PathKey, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 1e2d2bbaa10..1f9213c09b0 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -753,6 +753,17 @@ typedef struct Unique Oid *uniqOperators; /* equality operators to compare with */ } Unique; +/* ------------ + * gather node + * ------------ + */ +typedef struct Gather +{ + Plan plan; + int num_workers; + bool single_copy; +} Gather; + /* ---------------- * hash build node * diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 961b5d17cfb..6cf2e24ce7d 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1047,6 +1047,19 @@ typedef struct UniquePath } UniquePath; /* + * GatherPath runs several copies of a plan in parallel and collects the + * results. The parallel leader may also execute the plan, unless the + * single_copy flag is set. + */ +typedef struct GatherPath +{ + Path path; + Path *subpath; /* path for each worker */ + int num_workers; /* number of workers sought to help */ + bool single_copy; /* path must not be executed >1x */ +} GatherPath; + +/* * All join-type paths share these fields. */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index dd43e45d0c0..25a730362a8 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -26,6 +26,8 @@ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 +#define DEFAULT_PARALLEL_TUPLE_COST 0.1 +#define DEFAULT_PARALLEL_SETUP_COST 1000.0 #define DEFAULT_EFFECTIVE_CACHE_SIZE 524288 /* measured in pages */ @@ -48,8 +50,11 @@ extern PGDLLIMPORT double random_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; +extern PGDLLIMPORT double parallel_tuple_cost; +extern PGDLLIMPORT double parallel_setup_cost; extern PGDLLIMPORT int effective_cache_size; extern Cost disable_cost; +extern int max_parallel_degree; extern bool enable_seqscan; extern bool enable_indexscan; extern bool enable_indexonlyscan; @@ -144,6 +149,8 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, JoinCostWorkspace *workspace, SpecialJoinInfo *sjinfo, SemiAntiJoinFactors *semifactors); +extern void cost_gather(GatherPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan); extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root); extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 161644c343b..7a4940c7d20 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -69,6 +69,9 @@ extern ResultPath *create_result_path(List *quals); extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath); extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, SpecialJoinInfo *sjinfo); +extern GatherPath *create_gather_path(PlannerInfo *root, + RelOptInfo *rel, Path *subpath, Relids required_outer, + int nworkers); extern Path *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, List *pathkeys, Relids required_outer); extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 0e149ea2f4d..feb821b409b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -707,6 +707,9 @@ FunctionParameterMode FunctionScan FunctionScanPerFuncState FunctionScanState +Gather +GatherPath +GatherState FuzzyAttrMatchState GBT_NUMKEY GBT_NUMKEY_R @@ -1195,6 +1198,7 @@ OverrideSearchPath OverrideStackEntry PACE_HEADER PACL +ParallelExecutorInfo PATH PBOOL PCtxtHandle |