diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 219 |
1 files changed, 202 insertions, 17 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index fd7e7cbf3d3..c4355506378 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "executor/execExpr.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeBitmapHeapscan.h" @@ -38,7 +39,9 @@ #include "optimizer/planner.h" #include "storage/spin.h" #include "tcop/tcopprot.h" +#include "utils/datum.h" #include "utils/dsa.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "pgstat.h" @@ -50,7 +53,7 @@ */ #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001) #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002) -#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003) +#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005) #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006) @@ -65,6 +68,7 @@ typedef struct FixedParallelExecutorState { int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ + dsa_pointer param_exec; } FixedParallelExecutorState; /* @@ -267,6 +271,133 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) } /* + * Estimate the amount of space required to serialize the indicated parameters. + */ +static Size +EstimateParamExecSpace(EState *estate, Bitmapset *params) +{ + int paramid; + Size sz = sizeof(int); + + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + ParamExecData *prm; + + prm = &(estate->es_param_exec_vals[paramid]); + typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes, + paramid); + + sz = add_size(sz, sizeof(int)); /* space for paramid */ + + /* space for datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + sz = add_size(sz, + datumEstimateSpace(prm->value, prm->isnull, + typByVal, typLen)); + } + return sz; +} + +/* + * Serialize specified PARAM_EXEC parameters. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte paramid (location of param in execution time internal + * parameter array) and then the datum as serialized by datumSerialize(). + */ +static dsa_pointer +SerializeParamExecParams(EState *estate, Bitmapset *params) +{ + Size size; + int nparams; + int paramid; + ParamExecData *prm; + dsa_pointer handle; + char *start_address; + + /* Allocate enough space for the current parameter values. */ + size = EstimateParamExecSpace(estate, params); + handle = dsa_allocate(estate->es_query_dsa, size); + start_address = dsa_get_address(estate->es_query_dsa, handle); + + /* First write the number of parameters as a 4-byte integer. */ + nparams = bms_num_members(params); + memcpy(start_address, &nparams, sizeof(int)); + start_address += sizeof(int); + + /* Write details for each parameter in turn. */ + paramid = -1; + while ((paramid = bms_next_member(params, paramid)) >= 0) + { + Oid typeOid; + int16 typLen; + bool typByVal; + + prm = &(estate->es_param_exec_vals[paramid]); + typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes, + paramid); + + /* Write paramid. */ + memcpy(start_address, ¶mid, sizeof(int)); + start_address += sizeof(int); + + /* Write datum/isnull */ + if (OidIsValid(typeOid)) + get_typlenbyval(typeOid, &typLen, &typByVal); + else + { + /* If no type OID, assume by-value, like copyParamList does. */ + typLen = sizeof(Datum); + typByVal = true; + } + datumSerialize(prm->value, prm->isnull, typByVal, typLen, + &start_address); + } + + return handle; +} + +/* + * Restore specified PARAM_EXEC parameters. + */ +static void +RestoreParamExecParams(char *start_address, EState *estate) +{ + int nparams; + int i; + int paramid; + + memcpy(&nparams, start_address, sizeof(int)); + start_address += sizeof(int); + + for (i = 0; i < nparams; i++) + { + ParamExecData *prm; + + /* Read paramid */ + memcpy(¶mid, start_address, sizeof(int)); + start_address += sizeof(int); + prm = &(estate->es_param_exec_vals[paramid]); + + /* Read datum/isnull. */ + prm->value = datumRestore(&start_address, &prm->isnull); + prm->execPlan = NULL; + } +} + +/* * Initialize the dynamic shared memory segment that will be used to control * parallel execution. */ @@ -395,7 +526,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) * execution and return results to the main backend. */ ParallelExecutorInfo * -ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, +ExecInitParallelPlan(PlanState *planstate, EState *estate, + Bitmapset *sendParams, int nworkers, int64 tuples_needed) { ParallelExecutorInfo *pei; @@ -405,17 +537,20 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; - char *param_space; + char *paramlistinfo_space; BufferUsage *bufusage_space; SharedExecutorInstrumentation *instrumentation = NULL; int pstmt_len; - int param_len; + int paramlistinfo_len; int instrumentation_len = 0; int instrument_offset = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; + /* Force parameters we're going to pass to workers to be evaluated. */ + ExecEvalParamExecParams(sendParams, estate); + /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); pei->finished = false; @@ -450,8 +585,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate space for serialized ParamListInfo. */ - param_len = EstimateParamListSpace(estate->es_param_list_info); - shm_toc_estimate_chunk(&pcxt->estimator, param_len); + paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info); + shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); /* @@ -511,6 +646,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, /* Store fixed-size state. */ fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState)); fpes->tuples_needed = tuples_needed; + fpes->param_exec = InvalidDsaPointer; shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -524,9 +660,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); /* Store serialized ParamListInfo. */ - param_space = shm_toc_allocate(pcxt->toc, param_len); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space); - SerializeParamList(estate->es_param_list_info, ¶m_space); + paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); + SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, @@ -577,13 +713,25 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, pei->area = dsa_create_in_place(area_space, dsa_minsize, LWTRANCHE_PARALLEL_QUERY_DSA, pcxt->seg); - } - /* - * Make the area available to executor nodes running in the leader. See - * also ParallelQueryMain which makes it available to workers. - */ - estate->es_query_dsa = pei->area; + /* + * Make the area available to executor nodes running in the leader. + * See also ParallelQueryMain which makes it available to workers. + */ + estate->es_query_dsa = pei->area; + + /* + * Serialize parameters, if any, using DSA storage. We don't dare use + * the main parallel query DSM for this because we might relaunch + * workers after the values have changed (and thus the amount of + * storage required has changed). + */ + if (!bms_is_empty(sendParams)) + { + pei->param_exec = SerializeParamExecParams(estate, sendParams); + fpes->param_exec = pei->param_exec; + } + } /* * Give parallel-aware nodes a chance to initialize their shared data. @@ -640,16 +788,39 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei) */ void ExecParallelReinitialize(PlanState *planstate, - ParallelExecutorInfo *pei) + ParallelExecutorInfo *pei, + Bitmapset *sendParams) { + EState *estate = planstate->state; + FixedParallelExecutorState *fpes; + /* Old workers must already be shut down */ Assert(pei->finished); + /* Force parameters we're going to pass to workers to be evaluated. */ + ExecEvalParamExecParams(sendParams, estate); + ReinitializeParallelDSM(pei->pcxt); pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); pei->reader = NULL; pei->finished = false; + fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false); + + /* Free any serialized parameters from the last round. */ + if (DsaPointerIsValid(fpes->param_exec)) + { + dsa_free(estate->es_query_dsa, fpes->param_exec); + fpes->param_exec = InvalidDsaPointer; + } + + /* Serialize current parameter values if required. */ + if (!bms_is_empty(sendParams)) + { + pei->param_exec = SerializeParamExecParams(estate, sendParams); + fpes->param_exec = pei->param_exec; + } + /* Traverse plan tree and let each child node reset associated state. */ ExecParallelReInitializeDSM(planstate, pei->pcxt); } @@ -831,6 +1002,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei) void ExecParallelCleanup(ParallelExecutorInfo *pei) { + /* Free any serialized parameters. */ + if (DsaPointerIsValid(pei->param_exec)) + { + dsa_free(pei->area, pei->param_exec); + pei->param_exec = InvalidDsaPointer; + } if (pei->area != NULL) { dsa_detach(pei->area); @@ -882,7 +1059,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, pstmt = (PlannedStmt *) stringToNode(pstmtspace); /* Reconstruct ParamListInfo. */ - paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false); + paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false); paramLI = RestoreParamList(¶mspace); /* @@ -1046,6 +1223,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; + if (DsaPointerIsValid(fpes->param_exec)) + { + char *paramexec_space; + + paramexec_space = dsa_get_address(area, fpes->param_exec); + RestoreParamExecParams(paramexec_space, queryDesc->estate); + + } ExecParallelInitializeWorker(queryDesc->planstate, toc); /* Pass down any tuple bound */ |