aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2017-11-16 12:06:14 -0500
committerRobert Haas <rhaas@postgresql.org>2017-11-16 12:06:14 -0500
commite89a71fb449af2ef74f47be1175f99956cf21524 (patch)
tree7a71a9bbe0ca8b02f518531eaa82ba9ea5a1c5c6 /src/backend/executor
parentff2d4356f8b18f5489e5d7b1f8b4b5357d088c8c (diff)
downloadpostgresql-e89a71fb449af2ef74f47be1175f99956cf21524.tar.gz
postgresql-e89a71fb449af2ef74f47be1175f99956cf21524.zip
Pass InitPlan values to workers via Gather (Merge).
If a PARAM_EXEC parameter is used below a Gather (Merge) but the InitPlan that computes it is attached to or above the Gather (Merge), force the value to be computed before starting parallelism and pass it down to all workers. This allows us to use parallelism in cases where it previously would have had to be rejected as unsafe. We do - in this case - lose the optimization that the value is only computed if it's actually used. An alternative strategy would be to have the first worker that needs the value compute it, but one downside of that approach is that we'd then need to select a parallel-safe path to compute the parameter value; it couldn't for example contain a Gather (Merge) node. At some point in the future, we might want to consider both approaches. Independent of that consideration, there is a great deal more work that could be done to make more kinds of PARAM_EXEC parameters parallel-safe. This infrastructure could be used to allow a Gather (Merge) on the inner side of a nested loop (although that's not a very appealing plan) and cases where the InitPlan is attached below the Gather (Merge) could be addressed as well using various techniques. But this is a good start. Amit Kapila, reviewed and revised by me. Reviewing and testing from Kuntal Ghosh, Haribabu Kommi, and Tushar Ahuja. Discussion: http://postgr.es/m/CAA4eK1LV0Y1AUV4cUCdC+sYOx0Z0-8NAJ2Pd9=UKsbQ5Sr7+JQ@mail.gmail.com
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execExprInterp.c27
-rw-r--r--src/backend/executor/execParallel.c219
-rw-r--r--src/backend/executor/nodeGather.c4
-rw-r--r--src/backend/executor/nodeGatherMerge.c4
4 files changed, 235 insertions, 19 deletions
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index a0f537b706b..6c4612dad4a 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -1927,6 +1927,33 @@ ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
}
/*
+ * ExecEvalParamExecParams
+ *
+ * Execute the subplan stored in PARAM_EXEC initplans params, if not executed
+ * till now.
+ */
+void
+ExecEvalParamExecParams(Bitmapset *params, EState *estate)
+{
+ ParamExecData *prm;
+ int paramid;
+
+ paramid = -1;
+ while ((paramid = bms_next_member(params, paramid)) >= 0)
+ {
+ prm = &(estate->es_param_exec_vals[paramid]);
+
+ if (prm->execPlan != NULL)
+ {
+ /* Parameter not evaluated yet, so go do it */
+ ExecSetParamPlan(prm->execPlan, GetPerTupleExprContext(estate));
+ /* ExecSetParamPlan should have processed this param... */
+ Assert(prm->execPlan == NULL);
+ }
+ }
+}
+
+/*
* Evaluate a PARAM_EXTERN parameter.
*
* PARAM_EXTERN parameters must be sought in ecxt_param_list_info.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index fd7e7cbf3d3..c4355506378 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -23,6 +23,7 @@
#include "postgres.h"
+#include "executor/execExpr.h"
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeBitmapHeapscan.h"
@@ -38,7 +39,9 @@
#include "optimizer/planner.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
+#include "utils/datum.h"
#include "utils/dsa.h"
+#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
@@ -50,7 +53,7 @@
*/
#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
@@ -65,6 +68,7 @@
typedef struct FixedParallelExecutorState
{
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+ dsa_pointer param_exec;
} FixedParallelExecutorState;
/*
@@ -267,6 +271,133 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
}
/*
+ * Estimate the amount of space required to serialize the indicated parameters.
+ */
+static Size
+EstimateParamExecSpace(EState *estate, Bitmapset *params)
+{
+ int paramid;
+ Size sz = sizeof(int);
+
+ paramid = -1;
+ while ((paramid = bms_next_member(params, paramid)) >= 0)
+ {
+ Oid typeOid;
+ int16 typLen;
+ bool typByVal;
+ ParamExecData *prm;
+
+ prm = &(estate->es_param_exec_vals[paramid]);
+ typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
+ paramid);
+
+ sz = add_size(sz, sizeof(int)); /* space for paramid */
+
+ /* space for datum/isnull */
+ if (OidIsValid(typeOid))
+ get_typlenbyval(typeOid, &typLen, &typByVal);
+ else
+ {
+ /* If no type OID, assume by-value, like copyParamList does. */
+ typLen = sizeof(Datum);
+ typByVal = true;
+ }
+ sz = add_size(sz,
+ datumEstimateSpace(prm->value, prm->isnull,
+ typByVal, typLen));
+ }
+ return sz;
+}
+
+/*
+ * Serialize specified PARAM_EXEC parameters.
+ *
+ * We write the number of parameters first, as a 4-byte integer, and then
+ * write details for each parameter in turn. The details for each parameter
+ * consist of a 4-byte paramid (location of param in execution time internal
+ * parameter array) and then the datum as serialized by datumSerialize().
+ */
+static dsa_pointer
+SerializeParamExecParams(EState *estate, Bitmapset *params)
+{
+ Size size;
+ int nparams;
+ int paramid;
+ ParamExecData *prm;
+ dsa_pointer handle;
+ char *start_address;
+
+ /* Allocate enough space for the current parameter values. */
+ size = EstimateParamExecSpace(estate, params);
+ handle = dsa_allocate(estate->es_query_dsa, size);
+ start_address = dsa_get_address(estate->es_query_dsa, handle);
+
+ /* First write the number of parameters as a 4-byte integer. */
+ nparams = bms_num_members(params);
+ memcpy(start_address, &nparams, sizeof(int));
+ start_address += sizeof(int);
+
+ /* Write details for each parameter in turn. */
+ paramid = -1;
+ while ((paramid = bms_next_member(params, paramid)) >= 0)
+ {
+ Oid typeOid;
+ int16 typLen;
+ bool typByVal;
+
+ prm = &(estate->es_param_exec_vals[paramid]);
+ typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
+ paramid);
+
+ /* Write paramid. */
+ memcpy(start_address, &paramid, sizeof(int));
+ start_address += sizeof(int);
+
+ /* Write datum/isnull */
+ if (OidIsValid(typeOid))
+ get_typlenbyval(typeOid, &typLen, &typByVal);
+ else
+ {
+ /* If no type OID, assume by-value, like copyParamList does. */
+ typLen = sizeof(Datum);
+ typByVal = true;
+ }
+ datumSerialize(prm->value, prm->isnull, typByVal, typLen,
+ &start_address);
+ }
+
+ return handle;
+}
+
+/*
+ * Restore specified PARAM_EXEC parameters.
+ */
+static void
+RestoreParamExecParams(char *start_address, EState *estate)
+{
+ int nparams;
+ int i;
+ int paramid;
+
+ memcpy(&nparams, start_address, sizeof(int));
+ start_address += sizeof(int);
+
+ for (i = 0; i < nparams; i++)
+ {
+ ParamExecData *prm;
+
+ /* Read paramid */
+ memcpy(&paramid, start_address, sizeof(int));
+ start_address += sizeof(int);
+ prm = &(estate->es_param_exec_vals[paramid]);
+
+ /* Read datum/isnull. */
+ prm->value = datumRestore(&start_address, &prm->isnull);
+ prm->execPlan = NULL;
+ }
+}
+
+/*
* Initialize the dynamic shared memory segment that will be used to control
* parallel execution.
*/
@@ -395,7 +526,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
* execution and return results to the main backend.
*/
ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+ExecInitParallelPlan(PlanState *planstate, EState *estate,
+ Bitmapset *sendParams, int nworkers,
int64 tuples_needed)
{
ParallelExecutorInfo *pei;
@@ -405,17 +537,20 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
FixedParallelExecutorState *fpes;
char *pstmt_data;
char *pstmt_space;
- char *param_space;
+ char *paramlistinfo_space;
BufferUsage *bufusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
int pstmt_len;
- int param_len;
+ int paramlistinfo_len;
int instrumentation_len = 0;
int instrument_offset = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
int query_len;
+ /* Force parameters we're going to pass to workers to be evaluated. */
+ ExecEvalParamExecParams(sendParams, estate);
+
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
pei->finished = false;
@@ -450,8 +585,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for serialized ParamListInfo. */
- param_len = EstimateParamListSpace(estate->es_param_list_info);
- shm_toc_estimate_chunk(&pcxt->estimator, param_len);
+ paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
+ shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
@@ -511,6 +646,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
/* Store fixed-size state. */
fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
fpes->tuples_needed = tuples_needed;
+ fpes->param_exec = InvalidDsaPointer;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
/* Store query string */
@@ -524,9 +660,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
/* Store serialized ParamListInfo. */
- param_space = shm_toc_allocate(pcxt->toc, param_len);
- shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
- SerializeParamList(estate->es_param_list_info, &param_space);
+ paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
+ SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate(pcxt->toc,
@@ -577,13 +713,25 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
pei->area = dsa_create_in_place(area_space, dsa_minsize,
LWTRANCHE_PARALLEL_QUERY_DSA,
pcxt->seg);
- }
- /*
- * Make the area available to executor nodes running in the leader. See
- * also ParallelQueryMain which makes it available to workers.
- */
- estate->es_query_dsa = pei->area;
+ /*
+ * Make the area available to executor nodes running in the leader.
+ * See also ParallelQueryMain which makes it available to workers.
+ */
+ estate->es_query_dsa = pei->area;
+
+ /*
+ * Serialize parameters, if any, using DSA storage. We don't dare use
+ * the main parallel query DSM for this because we might relaunch
+ * workers after the values have changed (and thus the amount of
+ * storage required has changed).
+ */
+ if (!bms_is_empty(sendParams))
+ {
+ pei->param_exec = SerializeParamExecParams(estate, sendParams);
+ fpes->param_exec = pei->param_exec;
+ }
+ }
/*
* Give parallel-aware nodes a chance to initialize their shared data.
@@ -640,16 +788,39 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei)
*/
void
ExecParallelReinitialize(PlanState *planstate,
- ParallelExecutorInfo *pei)
+ ParallelExecutorInfo *pei,
+ Bitmapset *sendParams)
{
+ EState *estate = planstate->state;
+ FixedParallelExecutorState *fpes;
+
/* Old workers must already be shut down */
Assert(pei->finished);
+ /* Force parameters we're going to pass to workers to be evaluated. */
+ ExecEvalParamExecParams(sendParams, estate);
+
ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
pei->reader = NULL;
pei->finished = false;
+ fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
+ /* Free any serialized parameters from the last round. */
+ if (DsaPointerIsValid(fpes->param_exec))
+ {
+ dsa_free(estate->es_query_dsa, fpes->param_exec);
+ fpes->param_exec = InvalidDsaPointer;
+ }
+
+ /* Serialize current parameter values if required. */
+ if (!bms_is_empty(sendParams))
+ {
+ pei->param_exec = SerializeParamExecParams(estate, sendParams);
+ fpes->param_exec = pei->param_exec;
+ }
+
/* Traverse plan tree and let each child node reset associated state. */
ExecParallelReInitializeDSM(planstate, pei->pcxt);
}
@@ -831,6 +1002,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
void
ExecParallelCleanup(ParallelExecutorInfo *pei)
{
+ /* Free any serialized parameters. */
+ if (DsaPointerIsValid(pei->param_exec))
+ {
+ dsa_free(pei->area, pei->param_exec);
+ pei->param_exec = InvalidDsaPointer;
+ }
if (pei->area != NULL)
{
dsa_detach(pei->area);
@@ -882,7 +1059,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
pstmt = (PlannedStmt *) stringToNode(pstmtspace);
/* Reconstruct ParamListInfo. */
- paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false);
+ paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
paramLI = RestoreParamList(&paramspace);
/*
@@ -1046,6 +1223,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Special executor initialization steps for parallel workers */
queryDesc->planstate->state->es_query_dsa = area;
+ if (DsaPointerIsValid(fpes->param_exec))
+ {
+ char *paramexec_space;
+
+ paramexec_space = dsa_get_address(area, fpes->param_exec);
+ RestoreParamExecParams(paramexec_space, queryDesc->estate);
+
+ }
ExecParallelInitializeWorker(queryDesc->planstate, toc);
/* Pass down any tuple bound */
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 0298c65d065..07c62d2feab 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -160,11 +160,13 @@ ExecGather(PlanState *pstate)
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
+ gather->initParam,
gather->num_workers,
node->tuples_needed);
else
ExecParallelReinitialize(node->ps.lefttree,
- node->pei);
+ node->pei,
+ gather->initParam);
/*
* Register backend workers. We might not get as many as we
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 7206ab91975..7dd655c4489 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -203,11 +203,13 @@ ExecGatherMerge(PlanState *pstate)
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
+ gm->initParam,
gm->num_workers,
node->tuples_needed);
else
ExecParallelReinitialize(node->ps.lefttree,
- node->pei);
+ node->pei,
+ gm->initParam);
/* Try to launch workers. */
pcxt = node->pei->pcxt;