aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/Makefile4
-rw-r--r--src/backend/executor/execAmi.c8
-rw-r--r--src/backend/executor/execMain.c3
-rw-r--r--src/backend/executor/execParallel.c14
-rw-r--r--src/backend/executor/execProcnode.c46
-rw-r--r--src/backend/executor/nodeGather.c299
6 files changed, 366 insertions, 8 deletions
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f5e1e1aefcd..51edd4c5e70 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -17,8 +17,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
execScan.o execTuples.o \
execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
nodeBitmapAnd.o nodeBitmapOr.o \
- nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \
- nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
+ nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \
+ nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
nodeLimit.o nodeLockRows.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 93e1e9a691c..163650cecd1 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -24,6 +24,7 @@
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeFunctionscan.h"
+#include "executor/nodeGather.h"
#include "executor/nodeGroup.h"
#include "executor/nodeGroup.h"
#include "executor/nodeHash.h"
@@ -160,6 +161,10 @@ ExecReScan(PlanState *node)
ExecReScanSampleScan((SampleScanState *) node);
break;
+ case T_GatherState:
+ ExecReScanGather((GatherState *) node);
+ break;
+
case T_IndexScanState:
ExecReScanIndexScan((IndexScanState *) node);
break;
@@ -467,6 +472,9 @@ ExecSupportsBackwardScan(Plan *node)
/* Simplify life for tablesample methods by disallowing this */
return false;
+ case T_Gather:
+ return false;
+
case T_IndexScan:
return IndexSupportsBackwardScan(((IndexScan *) node)->indexid) &&
TargetListSupportsBackwardScan(node->targetlist);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 85ff46b8026..37b7bbd413b 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -347,6 +347,9 @@ standard_ExecutorRun(QueryDesc *queryDesc,
direction,
dest);
+ /* Allow nodes to release or shut down resources. */
+ (void) ExecShutdownNode(queryDesc->planstate);
+
/*
* shutdown tuple receiver, if we started it
*/
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index a409a9a571f..e6930c1d51c 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -71,7 +71,7 @@ typedef struct ExecParallelInitializeDSMContext
} ExecParallelInitializeDSMContext;
/* Helper functions that run in the parallel leader. */
-static char *ExecSerializePlan(Plan *plan, List *rangetable);
+static char *ExecSerializePlan(Plan *plan, EState *estate);
static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node,
@@ -88,7 +88,7 @@ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
* Create a serialized representation of the plan to be sent to each worker.
*/
static char *
-ExecSerializePlan(Plan *plan, List *rangetable)
+ExecSerializePlan(Plan *plan, EState *estate)
{
PlannedStmt *pstmt;
ListCell *tlist;
@@ -125,13 +125,13 @@ ExecSerializePlan(Plan *plan, List *rangetable)
pstmt->canSetTag = 1;
pstmt->transientPlan = 0;
pstmt->planTree = plan;
- pstmt->rtable = rangetable;
+ pstmt->rtable = estate->es_range_table;
pstmt->resultRelations = NIL;
pstmt->utilityStmt = NULL;
pstmt->subplans = NIL;
pstmt->rewindPlanIDs = NULL;
pstmt->rowMarks = NIL;
- pstmt->nParamExec = 0;
+ pstmt->nParamExec = estate->es_plannedstmt->nParamExec;
pstmt->relationOids = NIL;
pstmt->invalItems = NIL; /* workers can't replan anyway... */
pstmt->hasRowSecurity = false;
@@ -271,7 +271,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pei->planstate = planstate;
/* Fix up and serialize plan to be sent to workers. */
- pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table);
+ pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
@@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecutorStart(queryDesc, 0);
ExecutorRun(queryDesc, ForwardScanDirection, 0L);
ExecutorFinish(queryDesc);
- ExecutorEnd(queryDesc);
/* Report buffer usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
@@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
ExecParallelReportInstrumentation(queryDesc->planstate,
instrumentation);
+ /* Must do this after capturing instrumentation. */
+ ExecutorEnd(queryDesc);
+
/* Cleanup. */
FreeQueryDesc(queryDesc);
(*receiver->rDestroy) (receiver);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 03c2febc3e1..5bc1d489421 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -100,6 +100,7 @@
#include "executor/nodeMergejoin.h"
#include "executor/nodeModifyTable.h"
#include "executor/nodeNestloop.h"
+#include "executor/nodeGather.h"
#include "executor/nodeRecursiveunion.h"
#include "executor/nodeResult.h"
#include "executor/nodeSamplescan.h"
@@ -113,6 +114,7 @@
#include "executor/nodeValuesscan.h"
#include "executor/nodeWindowAgg.h"
#include "executor/nodeWorktablescan.h"
+#include "nodes/nodeFuncs.h"
#include "miscadmin.h"
@@ -307,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
estate, eflags);
break;
+ case T_Gather:
+ result = (PlanState *) ExecInitGather((Gather *) node,
+ estate, eflags);
+ break;
+
case T_Hash:
result = (PlanState *) ExecInitHash((Hash *) node,
estate, eflags);
@@ -504,6 +511,10 @@ ExecProcNode(PlanState *node)
result = ExecUnique((UniqueState *) node);
break;
+ case T_GatherState:
+ result = ExecGather((GatherState *) node);
+ break;
+
case T_HashState:
result = ExecHash((HashState *) node);
break;
@@ -658,6 +669,10 @@ ExecEndNode(PlanState *node)
ExecEndSampleScan((SampleScanState *) node);
break;
+ case T_GatherState:
+ ExecEndGather((GatherState *) node);
+ break;
+
case T_IndexScanState:
ExecEndIndexScan((IndexScanState *) node);
break;
@@ -769,3 +784,34 @@ ExecEndNode(PlanState *node)
break;
}
}
+
+/*
+ * ExecShutdownNode
+ *
+ * Give execution nodes a chance to stop asynchronous resource consumption
+ * and release any resources still held. Currently, this is only used for
+ * parallel query, but we might want to extend it to other cases also (e.g.
+ * FDW). We might also want to call it sooner, as soon as it's evident that
+ * no more rows will be needed (e.g. when a Limit is filled) rather than only
+ * at the end of ExecutorRun.
+ */
+bool
+ExecShutdownNode(PlanState *node)
+{
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_GatherState:
+ {
+ ExecShutdownGather((GatherState *) node);
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(node, ExecShutdownNode, NULL);
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
new file mode 100644
index 00000000000..735dbaa2226
--- /dev/null
+++ b/src/backend/executor/nodeGather.c
@@ -0,0 +1,299 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeGather.c
+ * Support routines for scanning a plan via multiple workers.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/executor/nodeGather.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/relscan.h"
+#include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/nodeGather.h"
+#include "executor/nodeSubplan.h"
+#include "executor/tqueue.h"
+#include "utils/rel.h"
+
+
+static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+
+
+/* ----------------------------------------------------------------
+ * ExecInitGather
+ * ----------------------------------------------------------------
+ */
+GatherState *
+ExecInitGather(Gather *node, EState *estate, int eflags)
+{
+ GatherState *gatherstate;
+
+ /* Gather node doesn't have innerPlan node. */
+ Assert(innerPlan(node) == NULL);
+
+ /*
+ * create state structure
+ */
+ gatherstate = makeNode(GatherState);
+ gatherstate->ps.plan = (Plan *) node;
+ gatherstate->ps.state = estate;
+ gatherstate->need_to_scan_workers = false;
+ gatherstate->need_to_scan_locally = !node->single_copy;
+
+ /*
+ * Miscellaneous initialization
+ *
+ * create expression context for node
+ */
+ ExecAssignExprContext(estate, &gatherstate->ps);
+
+ /*
+ * initialize child expressions
+ */
+ gatherstate->ps.targetlist = (List *)
+ ExecInitExpr((Expr *) node->plan.targetlist,
+ (PlanState *) gatherstate);
+ gatherstate->ps.qual = (List *)
+ ExecInitExpr((Expr *) node->plan.qual,
+ (PlanState *) gatherstate);
+
+ /*
+ * tuple table initialization
+ */
+ ExecInitResultTupleSlot(estate, &gatherstate->ps);
+
+ /*
+ * now initialize outer plan
+ */
+ outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+
+ gatherstate->ps.ps_TupFromTlist = false;
+
+ /*
+ * Initialize result tuple type and projection info.
+ */
+ ExecAssignResultTypeFromTL(&gatherstate->ps);
+ ExecAssignProjectionInfo(&gatherstate->ps, NULL);
+
+ return gatherstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecGather(node)
+ *
+ * Scans the relation via multiple workers and returns
+ * the next qualifying tuple.
+ * ----------------------------------------------------------------
+ */
+TupleTableSlot *
+ExecGather(GatherState *node)
+{
+ int i;
+ TupleTableSlot *slot;
+
+ /*
+ * Initialize the parallel context and workers on first execution. We do
+ * this on first execution rather than during node initialization, as it
+ * needs to allocate large dynamic segement, so it is better to do if it
+ * is really needed.
+ */
+ if (!node->pei)
+ {
+ EState *estate = node->ps.state;
+
+ /* Initialize the workers required to execute Gather node. */
+ node->pei = ExecInitParallelPlan(node->ps.lefttree,
+ estate,
+ ((Gather *) (node->ps.plan))->num_workers);
+
+ /*
+ * Register backend workers. If the required number of workers are not
+ * available then we perform the scan with available workers and if
+ * there are no more workers available, then the Gather node will just
+ * scan locally.
+ */
+ LaunchParallelWorkers(node->pei->pcxt);
+
+ node->funnel = CreateTupleQueueFunnel();
+
+ for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+ {
+ if (node->pei->pcxt->worker[i].bgwhandle)
+ {
+ shm_mq_set_handle(node->pei->tqueue[i],
+ node->pei->pcxt->worker[i].bgwhandle);
+ RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]);
+ node->need_to_scan_workers = true;
+ }
+ }
+
+ /* If no workers are available, we must always scan locally. */
+ if (!node->need_to_scan_workers)
+ node->need_to_scan_locally = true;
+ }
+
+ slot = gather_getnext(node);
+
+ if (TupIsNull(slot))
+ {
+ /*
+ * Destroy the parallel context once we complete fetching all the
+ * tuples. Otherwise, the DSM and workers will stick around for the
+ * lifetime of the entire statement.
+ */
+ ExecShutdownGather(node);
+ }
+ return slot;
+}
+
+/* ----------------------------------------------------------------
+ * ExecEndGather
+ *
+ * frees any storage allocated through C routines.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndGather(GatherState *node)
+{
+ ExecShutdownGather(node);
+ ExecFreeExprContext(&node->ps);
+ ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ ExecEndNode(outerPlanState(node));
+}
+
+/*
+ * gather_getnext
+ *
+ * Get the next tuple from shared memory queue. This function
+ * is reponsible for fetching tuples from all the queues associated
+ * with worker backends used in Gather node execution and if there is
+ * no data available from queues or no worker is available, it does
+ * fetch the data from local node.
+ */
+TupleTableSlot *
+gather_getnext(GatherState *gatherstate)
+{
+ PlanState *outerPlan;
+ TupleTableSlot *outerTupleSlot;
+ TupleTableSlot *slot;
+ HeapTuple tup;
+
+ /*
+ * We can use projection info of Gather for the tuples received from
+ * worker backends as currently for all cases worker backends sends the
+ * projected tuple as required by Gather node.
+ */
+ slot = gatherstate->ps.ps_ProjInfo->pi_slot;
+
+ while (gatherstate->need_to_scan_workers ||
+ gatherstate->need_to_scan_locally)
+ {
+ if (gatherstate->need_to_scan_workers)
+ {
+ bool done = false;
+
+ /* wait only if local scan is done */
+ tup = TupleQueueFunnelNext(gatherstate->funnel,
+ gatherstate->need_to_scan_locally,
+ &done);
+ if (done)
+ gatherstate->need_to_scan_workers = false;
+
+ if (HeapTupleIsValid(tup))
+ {
+ ExecStoreTuple(tup, /* tuple to store */
+ slot, /* slot to store in */
+ InvalidBuffer, /* buffer associated with this
+ * tuple */
+ true); /* pfree this pointer if not from heap */
+
+ return slot;
+ }
+ }
+
+ if (gatherstate->need_to_scan_locally)
+ {
+ outerPlan = outerPlanState(gatherstate);
+
+ outerTupleSlot = ExecProcNode(outerPlan);
+
+ if (!TupIsNull(outerTupleSlot))
+ return outerTupleSlot;
+
+ gatherstate->need_to_scan_locally = false;
+ }
+ }
+
+ return ExecClearTuple(slot);
+}
+
+/* ----------------------------------------------------------------
+ * ExecShutdownGather
+ *
+ * Destroy the setup for parallel workers. Collect all the
+ * stats after workers are stopped, else some work done by
+ * workers won't be accounted.
+ * ----------------------------------------------------------------
+ */
+void
+ExecShutdownGather(GatherState *node)
+{
+ Gather *gather;
+
+ if (node->pei == NULL || node->pei->pcxt == NULL)
+ return;
+
+ /*
+ * Ensure all workers have finished before destroying the parallel context
+ * to ensure a clean exit.
+ */
+ if (node->funnel)
+ {
+ DestroyTupleQueueFunnel(node->funnel);
+ node->funnel = NULL;
+ }
+
+ ExecParallelFinish(node->pei);
+
+ /* destroy parallel context. */
+ DestroyParallelContext(node->pei->pcxt);
+ node->pei->pcxt = NULL;
+
+ gather = (Gather *) node->ps.plan;
+ node->need_to_scan_locally = !gather->single_copy;
+ node->need_to_scan_workers = false;
+}
+
+/* ----------------------------------------------------------------
+ * Join Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecReScanGather
+ *
+ * Re-initialize the workers and rescans a relation via them.
+ * ----------------------------------------------------------------
+ */
+void
+ExecReScanGather(GatherState *node)
+{
+ /*
+ * Re-initialize the parallel context and workers to perform rescan of
+ * relation. We want to gracefully shutdown all the workers so that they
+ * should be able to propagate any error or other information to master
+ * backend before dying.
+ */
+ ExecShutdownGather(node);
+
+ ExecReScan(node->ps.lefttree);
+}